package com.chief.sink.rocket;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

public class RocketMqSink extends RichSinkFunction<String> {

    DefaultMQProducer defaultMQProducer = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        defaultMQProducer = new DefaultMQProducer("sinkProducerGroup");
        defaultMQProducer.setNamesrvAddr("192.168.10.54:9876;192.168.10.55:9876");
        defaultMQProducer.setSendMsgTimeout(88888);
        defaultMQProducer.start();
    }

    @Override
    public void close() throws Exception {
        defaultMQProducer.shutdown();
        defaultMQProducer = null;
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        defaultMQProducer.send(new Message("sinkTopic", value.getBytes(StandardCharsets.UTF_8)));
    }
}
